package com.hao.chapter11;

import com.hao.chapter05.ClickSource;
import com.hao.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

public class TableFlow {
    public static void main(String[] args) throws Exception {
        //1.创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据源
        SingleOutputStreamOperator<Event> dataStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        //2.创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.将DataStream 转换成 Table
        Table eventTable = tableEnv.fromDataStream(dataStream);

        //4.注册临时表
        tableEnv.createTemporaryView("clickTable",eventTable);

        //5.编写sql操作
        String sql = "select `user`,count(url) as cnt from clickTable group by user";
        String sql1 = "select `user` from clickTable";
        //6.执行sql
        Table aggResult = tableEnv.sqlQuery(sql);
        Table result = tableEnv.sqlQuery(sql1);

        //7.输出 (流转换成表)
        tableEnv.toChangelogStream(aggResult).print("agg");
        tableEnv.toDataStream(result).print("result");

        //8.执行
        env.execute();
    }
}
